package com.lazar.dw.dws

import java.lang
import java.util.concurrent.TimeUnit

import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import com.lazar.model.{AdClick, BlackUser}
import com.lazar.utils.SourceKafka
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
//import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/**
 * 需求4：显示：黑名单用户ID、广告ID、点击数
 * */
object BlackUserStatistics {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val kafkaConsumer: FlinkKafkaConsumer[String] = new SourceKafka().getKafkaSource("eventlog")
    val data: DataStream[String] = env.addSource(kafkaConsumer)
    /*
    area/uid/productId/timestamp
     */
    val adClickStream: DataStream[AdClick] = data.map(x => {
      val adJsonObject: JSONObject = JSON.parseObject(x)
      val attrObject: JSONObject = adJsonObject.getJSONObject("attr")
      val area: String = attrObject.get("area").toString
      val uid: String = attrObject.get("uid").toString
      var productId: String = null
      var timestamp: Long = 0L
      val array: JSONArray = adJsonObject.getJSONArray("lagou_event")
      array.forEach(x => {
        val nObject: JSONObject = JSON.parseObject(x.toString)
        if (nObject.get("name").equals("ad")) {
          val adObject: JSONObject = nObject.getJSONObject("json")
          productId = adObject.get("product_id").toString
          timestamp = TimeUnit.MICROSECONDS.toSeconds(nObject.get("time").toString.toLong)
        }
      })
      AdClick(area, uid, productId, timestamp)
    })

    val value: DataStream[BlackUser] = adClickStream.keyBy(x => (x.uid, x.productId))
      .timeWindow(Time.seconds(10))
      .aggregate(new BlackAggFunc, new BlackWindowFunc)

    val result: DataStream[BlackUser] = value.filter(_.count > 10)
    result.print()

    env.execute()
  }

  class BlackAggFunc extends AggregateFunction[AdClick,Long,Long] {
    override def createAccumulator(): Long = 0L

    override def add(value: AdClick, accumulator: Long): Long = accumulator + 1

    override def getResult(accumulator: Long): Long = accumulator

    override def merge(a: Long, b: Long): Long = a + b
  }

  class BlackWindowFunc extends WindowFunction[Long,BlackUser,(String,String),TimeWindow] {
    override def apply(key: (String, String), window: TimeWindow, input: Iterable[Long], out: Collector[BlackUser]): Unit = {
      out.collect(BlackUser(key._1,key._2,input.iterator.next()))
    }
  }

}
